 1.Kafka源码剖析之Consumer消费者流程中Consumer示例
   
   KafkaConsumer
   消费者的根本目的是从Kafka服务端拉取消息,并交给业务逻辑进行处理。
   开发人员不必关心与Kafka服务端之间⽹络连接的管理、心跳检测、请求超时重试等底层操作
   也不必关心订阅Topic的分区数量、分区Leader副本的网络拓扑以及消费组的Rebalance等细节,
   另外还提供了自动提交offset的功能。
   案例
   
   public static void main(String[] args) throws InterruptedException {
// 是否自动提交
        Boolean autoCommit = false;
// 是否异步提交
        Boolean isSync = true;
        Properties props = new Properties();
// kafka地址,列表格式为host1:port1,host2:port2,…,⽆需添加所有的集群地址,kafka会根据
// 提供的地址发现其他的地址（建议多提供几个,以防提供的服务器关闭）
        props.put("bootstrap.servers", "localhost:9092");
// 消费组
        props.put("group.id", "test");
// 开启自动提交offset
        props.put("enable.auto.commit", autoCommit.toString());
// 1s自动提交
        props.put("auto.commit.interval.ms", "1000");
// 消费者和群组协调器的最大心跳时间,如果超过该时间则认为该消费者已经死亡或者故障,需要踢出
// 消费者组
        props.put("session.timeout.ms", "60000");
        // 一次poll间隔最大时间
        props.put("max.poll.interval.ms", "1000");
// 当消费者读取偏移量无效的情况下,需要重置消费起始位置,默认为latest（从消费者启动后生成的
// 记录）,另外一个选项值是 earliest,将从有效的最小位移位置开始消费
        props.put("auto.offset.reset", "latest");
// consumer端一次拉取数据的最大字节数
        props.put("fetch.max.bytes", "1024000");
// key序列化方式
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
// value序列化方式
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "lagou_edu";
// 订阅topic列表
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
// 消息拉取
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n",
                        record.offset(), record.key(), record.value());
            }
            if (!autoCommit) {
                if (isSync) {
// 处理完成单次消息以后,提交当前的offset,如果失败会一直重试直到成功
                    consumer.commitSync();
                } else {
// 异步提交
                    consumer.commitAsync((offsets, exception) -> {
                        exception.printStackTrace();
                        System.out.println(offsets.size());
                    });
                }
            }
            TimeUnit.SECONDS.sleep(3);
        }
    }
   Kafka服务端并不会记录消费者的消费位置,而是由消费者自己决定如何保存如何记录其消费的offset。在Kafka
服务端中添加了一个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会
触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后。消费者就可以读取该topic
中记录的offset,并从此offset位置继续消费.当然,使用该topic记录消费者的offset只是默认选项,开发人员可以根
据业务需求将offset记录在别的存储中。
   在消费者消费消息的过程中,提交offset的时机非常重要,因为它决定了消费者故障重启后的消费位置.在上面的
示例中,我们通过将enable.auto.commit选项设置为true可以起到⾃动提交offset的功能,auto.commit.interval.ms  
选项则设置了自动提交的时间间隔。每次在调用 KafkaConsumer.poll()方法时都会检测是否需要自动提交,并提交
上次poll()方法返回的最后⼀个消息的offset。为了避免消息丢失,建议poll()方法之前要处理完上次poll()方法拉
取的全部消息。KafkaConsumer中还提供了两个自动提交offset的方法,分别是commitSync()和commitAsync(),它们
都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。
 
 2.KafkaConsumer实例化
   
   了解了KafkaConsumer的基本使用,开始深入了解KafkaConsumer原理和实现,先看一下构造方法核心逻辑
   
   private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        try {
// 获取client.id,如果为空则默认生成一个,默认：consumer-1
            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
            if (clientId.isEmpty())
                clientId = "consumer-" +
                        CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;
// 获取消费组名
            String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
            LogContext logContext = new LogContext("[Consumer clientId=" + clientId
                    + ", groupId=" + groupId + "] ");
            this.log = logContext.logger(getClass());
            log.debug("Initializing the Kafka consumer");
            this.requestTimeoutMs =
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            int sessionTimeOutMs =
                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
            int fetchMaxWaitMs =
                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs
                    <= fetchMaxWaitMs)
                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG +
                        " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " +
                        ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            this.time = Time.SYSTEM;
// 与生产者逻辑相同
            Map<String, String> metricsTags = Collections.singletonMap("client-id",
                    clientId);
            MetricConfig metricConfig = new
                    MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                            TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_R
                            ECORDING_LEVEL_CONFIG)))
                    .tags(metricsTags);
            List<MetricsReporter> reporters =
                    config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                            MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.retryBackoffMs =
                    config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// 消费者拦截器
// load interceptors and make sure they get clientId
            Map<String, Object> userProvidedConfigs = config.originals();
            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new
                    ConsumerConfig(userProvidedConfigs,
                    false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ConsumerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new
                    ConsumerInterceptors<>(interceptorList);
// key反序列化
            if (keyDeserializer == null) {
                this.keyDeserializer =
                        config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                                Deserializer.class);
                this.keyDeserializer.configure(config.originals(), true);
            } else {
                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                this.keyDeserializer = keyDeserializer;
            }
// value反序列化
            if (valueDeserializer == null) {
                this.valueDeserializer =
                        config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                                Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }
            ClusterResourceListeners clusterResourceListeners =
                    configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters,
                            interceptorList);
            this.metadata = new Metadata(retryBackoffMs,
                    config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                    true, false, clusterResourceListeners);
            List<InetSocketAddress> addresses =
                    ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVER
                            S_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), Collections.
                    <String>emptySet(), 0);
            String metricGrpPrefix = "consumer";
            ConsumerMetrics metricsRegistry = new
                    ConsumerMetrics(metricsTags.keySet(), "consumer");
            ChannelBuilder channelBuilder =
                    ClientUtils.createChannelBuilder(config);
// 事务隔离级别
            IsolationLevel isolationLevel = IsolationLevel.valueOf(
                    config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,
                    metricsRegistry.fetcherMetrics);
// 网络组件
            NetworkClient netClient = new NetworkClient(
                    new
                            Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics,
                            time, metricGrpPrefix, channelBuilder, logContext),
                    this.metadata,
                    clientId,
                    100, // a fixed large enough value will suffice for max in-flight requests
                    
                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                    time,
                    true,
                    new ApiVersions(),
                    throttleTimeSensor,
                    logContext);
// 客户端
            this.client = new ConsumerNetworkClient(
                    logContext,
                    netClient,
                    metadata,
                    time,
                    retryBackoffMs,
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
// offset重置策略,默认是自动提交
            OffsetResetStrategy offsetResetStrategy =
                    OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                    ).toUpperCase(Locale.ROOT));
            this.subscriptions = new SubscriptionState(offsetResetStrategy);
            this.assignors = config.getConfiguredInstances(
                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                    PartitionAssignor.class);
// offset协调者
            this.coordinator = new ConsumerCoordinator(logContext,
                    this.client,
                    groupId,
                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    retryBackoffMs,
                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                    this.interceptors,
                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
                    config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
// 拉取器
            this.fetcher = new Fetcher<>(
                    logContext,
                    this.client,
                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                    this.keyDeserializer,
                    this.valueDeserializer,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricsRegistry.fetcherMetrics,
                    this.time,
                    this.retryBackoffMs,
                    isolationLevel);
// 打印用户设置,但是没有使用的配置项
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka consumer initialized");
        } catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
            close(0, true);
// now propagate the exception
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }
    1).初始化参数配置
	   (1).client.id、group.id、消费者拦截器、key/value序列化、事务隔离级别
    2).初始化网络客户端 NetworkClient
	3).初始化消费者网络客户端 ConsumerNetworkClient
	4).初始化offset提交策略,默认自动提交
	5).初始化消费者协调器 ConsumerCoordinator
	6).初始化拉取器 Fetcher
	
 3.订阅Topic 
   
   下面我们先来看一下subscribe方法都有哪些逻辑：
   public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
    {
// 轻量级锁
        acquireAndEnsureOpen();
        try {
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            } else if (topics.isEmpty()) {
// topics为空,则开始取消订阅的逻辑
                this.unsubscribe();
            } else {
// topic合法性判断,包含null或者空字符串直接抛异常
                for (String topic : topics) {
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot
                                contain null or empty topic");
                }
// 如果没有消费协调者直接抛异常
                throwIfNoAssignorsConfigured();
                log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 开始订阅
                this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 更新元数据,如果metadata当前不包括所有的topics则标记强制更新
                metadata.setTopics(subscriptions.groupSubscription());
            }
        } finally {
            release();
        }
    }
    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");
// 按照指定的Topic名字进行订阅,自动分配分区
        setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 监听
        this.listener = listener;
// 修改订阅信息
        changeSubscription(topics);
    }
    private void changeSubscription(Set<String> topicsToSubscribe) {
        if (!this.subscription.equals(topicsToSubscribe)) {
// 如果使用AUTO_TOPICS或AUTO_PARTITION模式,则使用此集合记录所有订阅的Topic
            this.subscription = topicsToSubscribe;
// Consumer Group中会选⼀个Leader,Leader会使用这个集合记录Consumer Group中所有消费者订阅
// 的Topic,而其他的Follower的这个集合只会保存自身订阅的Topic
            this.groupSubscription.addAll(topicsToSubscribe);
        }
    }
   1).KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次
判断topics集合中是否有方法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认
为 NoOpConsumerRebalanceListener ,一个空操作
       轻量级锁：分别记录了当前使用KafkaConsumer的线程id和重入次数,KafkaConsumer的acquire()和
release()方法实现了一个”轻量级锁“,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已
   2).每一个KafkaConsumer实例内部都拥有一个SubscriptionState对象,subscribe内部调用了subscribe
方法,subscribe方法订阅信息记录到 SubscriptionState,多次订阅会覆盖旧数据。
   3).更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后面会有更新的逻
辑),并且消费者侧的topic不会过期

 4.消息消费过程 
   
   下面KafkaConsumer的核心方法poll是如何拉取消息的,先来看一下下面的代码：
   1).poll
   public ConsumerRecords<K, V> poll(long timeout) {
// 使⽤轻量级锁检测kafkaConsumer是否被其他线程使用
        acquireAndEnsureOpen();
        try {
// 超时时间小于0抛异常
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");
// 订阅类型为NONE抛异常,表示当前消费者没有订阅任何topic或者没有分配分区
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any
                        topics or assigned any partitions");
// poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
// 核心方法,拉取消息
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
                        pollOnce(remaining);
                if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable
                    pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 如果拉取到了消息,发送一次消息拉取的请求,不会阻塞不会被中断
// 在返回数据之前,发送下次的 fetch 请求,避免⽤户在下次获取数据时线程block
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                        client.pollNoWakeup();
// 经过拦截器处理后返回
                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>
                                (records));
                }
                long elapsed = time.milliseconds() - start;
// 拉取超时就结束
                remaining = timeout - elapsed;
            } while (remaining > 0);
            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }
    (1).使用轻量级锁检测kafkaConsumer是否被其他线程使用
	(2).检查超时时间是否小于0,小于0抛出异常,停止消费
	(3).检查这个 consumer 是否订阅的相应的 topic-partition
	(4).调用 pollOnce() 方法获取相应的 records
	(5).在返回获取的records前,发送下一次的fetch请求,避免用户在下次请求时线程block在pollOnce()方法中 
	(6).如果在给定的时间（timeout）内获取不到可用的 records,返回空数据
	这里可以看出,poll方法的真正实现是在pollOnce方法中,poll方法通过pollOnce方法获取可用的数据
   2).pollOnce
   // 除了获取新数据外,还会做一些必要的 offset-commit和reset-offset的操作
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();
// 1. 获取 GroupCoordinator 地址并连接、加入 Group、sync Group、自动 commit, join
// 及 sync 期间 group 会进行 rebalance
        coordinator.poll(time.milliseconds(), timeout);
// 2. 更新订阅的 topic-partition 的 offset（如果订阅的 topic-partition list 没有有
// 效的 offset 的情况下）
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());
// 3. 获取 fetcher 已经拉取到的数据
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
                fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
// 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据（只要对应的 topic-partition
// 没有未完成的请求）
        fetcher.sendFetches();
        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
// 5. 调用poll 方法发送请求（底层发送请求的接⼝）
        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need
// this poll condition  to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });
// 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进行稳定状态
        if (coordinator.needRejoin())
            return Collections.emptyMap();
// 获取到请求的结果
        return fetcher.fetchedRecords();
    }
	pollOnce 可以简单分为6步来看,其作用分别如下:
	(1).coordinator.poll()
	获取 GroupCoordinator 的地址,并建立相应tcp连接,发送 join-group、sync-group,之后才真正加入到了一
个group中,这时会获取其要消费的topic-partition列表,如果设置了自动commit,也会在这一步进行commit。
总之,对于i个新建的 group,group 状态将会从 Empty –> PreparingRebalance –> AwaiSync –> Stable；
      (1).获取 GroupCoordinator 的地址,并建立相应 tcp 连接；
	  (2).发送 join-group 请求,然后 group 将会进行 rebalance；
	  (3).发送sync-group请求,之后才正在加入到了一个group中,这时会通过请求获取其要消费的
topic-partition 列表；
	  (4).如果设置了自动 commit，也会在这一步进行 commit offset
	(2).updateFetchPositions()
	这个方法主要是用来更新这个consumer实例订阅的topic-partition列表的fetch-offset
信息.目的就是为了获取其订阅的每个topic-partition对应的position,这样Fetcher才知道
从哪个offset开始去拉取这个topic-partition的数据
    private void updateFetchPositions(Set<TopicPartition> partitions) {
// 先重置那些调用 seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch
// position 的 offset
        fetcher.resetOffsetsIfNeeded(partitions);
        if (!subscriptions.hasAllFetchPositions(partitions)) {
// 获取所有分配 tp 的 offset, 即 committed offset, 更新到 TopicPartitionState
// 中的 committed offset 中
            coordinator.refreshCommittedOffsetsIfNeeded();
// 如果 the fetch position 值无效,则将上步获取的 committed offset 设置为 the
            fetch position
            fetcher.updateFetchPositions(partitions);
        }
    }
	在 Fetcher中,这个consumer实例订阅的每个topic-partition都会有一个对应的 
TopicPartitionState对象,在这个对象中会记录以下这些内容：
    private static class TopicPartitionState {
        // Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
        private Long position; // last consumed position
        // 最后⼀次获取的高水位标记
        private Long highWatermark; // the high watermark from last fetch
        private Long lastStableOffset;
// consumer 已经处理完的最新一条消息的 offset,consumer 主动调用offset-commit 时会
// 更新这个值；
        private OffsetAndMetadata committed; // last committed position
        // 是否暂停
        private boolean paused; // whether this partition has been paused by the
        user
        // 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防止再次操作
        private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
        
    }
	(3).fetcher.fetchedRecords()
	返回其fetched records,并更新其fetch-position offset,只有在offset-commit时
(自动commit时,是在第一步实现的),才会更新其 committed offset；
    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
// 在 max.poll.records 中设置单词最大的拉取条数
        int recordsRemaining = maxPollRecords;
        try {
            while (recordsRemaining > 0) {
                if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 从队列中获取但不移除此队列的头；如果此队列为空,返回null
                    CompletedFetch completedFetch = completedFetches.peek();
                    if (completedFetch == null) break;
// 获取下一个要处理的 nextInLineRecords
                    nextInLineRecords = parseCompletedFetch(completedFetch);
                    completedFetches.poll();
                } else {
// 拉取records,更新 position
                    List<ConsumerRecord<K, V>> records =
                            fetchRecords(nextInLineRecords, recordsRemaining);
                    TopicPartition partition = nextInLineRecords.partition;
                    if (!records.isEmpty()) {
                        List<ConsumerRecord<K, V>> currentRecords =
                                fetched.get(partition);
                        if (currentRecords == null) {
                            fetched.put(partition, records);
                        } else {
                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>
                                    (records.size() + currentRecords.size());
                            newRecords.addAll(currentRecords);
                            newRecords.addAll(records);
                            fetched.put(partition, newRecords);
                        }
                        recordsRemaining -= records.size();
                    }
                }
            }
        } catch (KafkaException e) {
            if (fetched.isEmpty())
                throw e;
        }
        return fetched;
    }
    private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords
                                                            partitionRecords, int maxRecords) {
        if (!subscriptions.isAssigned(partitionRecords.partition)) {
            log.debug("Not returning fetched records for partition {} since it is no
                    longer assigned",
                    partitionRecords.partition);
        } else {
            long position = subscriptions.position(partitionRecords.partition);
// 这个 tp 不能来消费了,比如调用 pause方法暂停消费
            if (!subscriptions.isFetchable(partitionRecords.partition)) {
                log.debug("Not returning fetched records for assigned partition {}
                        since it is no longer fetchable",
                        partitionRecords.partition);
            } else if (partitionRecords.nextFetchOffset == position) {
// 获取该 tp 对应的records,并更新 partitionRecords 的 fetchOffset(用于判断是否顺序)
                List<ConsumerRecord<K, V>> partRecords =
                        partitionRecords.fetchRecords(maxRecords);
                long nextOffset = partitionRecords.nextFetchOffset;
                log.trace("Returning fetched records at offset {} for assigned
                        partition {} and update " +
                "position to {}", position, partitionRecords.partition,
                        nextOffset);
// 更新消费的到 offset（ the fetch position）
                subscriptions.position(partitionRecords.partition, nextOffset);
// 获取 Lag（即 position与 hw 之间差值）,hw 为 null 时,才返回 null
                Long partitionLag =
                        subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
                if (partitionLag != null)
                    this.sensors.recordPartitionLag(partitionRecords.partition,
                            partitionLag);
                return partRecords;
            } else {
                log.debug("Ignoring fetched records for {} at offset {} since the
                        current position is {}",
                partitionRecords.partition, partitionRecords.nextFetchOffset,
                        position);
            }
        }
        partitionRecords.drain();
        return emptyList();
    }
	(4).fetcher.sendFetches()
	只要订阅的topic-partition list没有未处理的fetch请求,就发送对这个topic-partition的fetch请求,
在真正发送时,还是会按node级别去发送,leader 是同一个node的topic-partition会合成一个请求去发送；
    // 向订阅的所有 partition （只要该 leader 暂时没有拉取请求）所在 leader 发送 fetch 请求
    public int sendFetches() {
        // 1. 创建 Fetch Request
        Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
        for (Map.Entry<Node, FetchRequest.Builder> fetchEntry :
                fetchRequestMap.entrySet()) {
            final FetchRequest.Builder request = fetchEntry.getValue();
            final Node fetchTarget = fetchEntry.getKey();
            log.debug("Sending {} fetch for partitions {} to broker {}",
                    isolationLevel, request.fetchData().keySet(),
                    fetchTarget);
            // 2 发送 Fetch Request
            client.send(fetchTarget, request)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            FetchResponse response = (FetchResponse)
                                    resp.responseBody();
                            if (!matchesRequestedPartitions(request, response)) {
                                log.warn("Ignoring fetch response containing
                                        partitions {} since it does not match " +
                                "the requested partitions {}",
                                        response.responseData().keySet(),
                                        request.fetchData().keySet());
                                return;
                            }
                            Set<TopicPartition> partitions = new HashSet<>
                                    (response.responseData().keySet());
                            FetchResponseMetricAggregator metricAggregator = new
                                    FetchResponseMetricAggregator(sensors, partitions);
                            for (Map.Entry<TopicPartition,
                                    FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset =
                                        request.fetchData().get(partition).fetchOffset;
                                FetchResponse.PartitionData fetchData =
                                        entry.getValue();
                                log.debug("Fetch {} at offset {} for partition {}
                                        returned fetch data {}",
                                isolationLevel, fetchOffset, partition,
                                        fetchData);
                                completedFetches.add(new CompletedFetch(partition,
                                        fetchOffset, fetchData, metricAggregator,
                                        resp.requestHeader().apiVersion()));
                            }
                            sensors.fetchLatency.record(resp.requestLatencyMs());
                        }
                        @Override
                        public void onFailure(RuntimeException e) {
                            log.debug("Fetch request {} to {} failed",
                                    request.fetchData(), fetchTarget, e);
                        }
                    });
        }
        return fetchRequestMap.size();
    }
	  1).createFetchRequests()为订阅的所有topic-partition list 创建fetch请求(只要该
topic-partition没有还在处理的请求,创建的fetch 请求依然是按照 node 级别创建的;
	  2).client.send():发送fetch请求,并设置相应的Listener.请求处理成功的话,就加入到
completedFetches中,在加入这个completedFetches集合时,是按照topic-partition级别去加入,
这样也就方便了后续的处理。
    从这⾥可以看出,在每次发送 fetch 请求时,都会向所有可发送的topic-partition发送fetch
请求,调用一次fetcher.sendFetches,拉取到的数据,可需要多次pollOnce循环才能处理完,因为
Fetcher线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中
没有可处理的数据,用户的线程是会阻塞在poll方法中的
	(5).client.poll()
	调用底层 NetworkClient 提供的接口去发送相应的请求；
	(6).coordinator.needRejoin()
	如果当前实例分配的topic-partition列表发送了变化,那么这个consumer group就需要进行ebalance
 5.自动提交
   
   最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为true,消费者会自动把从
poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认值是5s.与
消费者里的其他东西一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否
该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
   不过，这种简便的方式也会带来一些问题，来看一下下面的例子：
   假设我们仍然使用默认的5s提交时间间隔,在最近一次提交之后的3s发⽣了再均衡,再均衡之后,消费者从最后一
次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以
通过修改提交时间间隔来更频繁地提交偏移量,减⼩可能出现重复消息的时间窗,不过这种情况是无也完全避免的
 6.手动提交
   
   1).同步提交
   取消自动提交,把auto.commit.offset设为false,让应⽤程序决定何时提交偏移量。使用commitSync() 
提交偏移量最简单也最可靠。这个API会提交由 poll()方法返回 的最新偏移量,提交成功后马上返回,
如果提交失败就抛出异常
   while (true) {
         // 消息拉取
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
                    record.key(), record.value());
        }
        // 处理完成单次消息以后，提交当前的offset，如果提交失败就抛出异常
        consumer.commitSync();
    }
   2).异步提交
   同步提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用
程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。
这个时候可以使用异步提交API。我们只管发送提交请求，无需等待 broker的响应。
   while (true) {
        // 消息拉取
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
                    record.key(), record.value());
        }
         // 异步提交
        consumer.commitAsync((offsets, exception) -> {
            exception.printStackTrace();
            System.out.println(offsets.size());
        });
    }